实践应用|Python自动化连接FTP批量下载指定文件
前言
有个小姐姐要从历史数据日志里根据一定的规则筛选一批数据,这批数据中有对局战场id字段,再根据这些id转化为文件名,连接远程FTP搜索该文件并下载到本地,然后打开文件删除前5行并在第6行行首添加一个字母,最后将该文件后缀名修改。 一天处理50+个这样的文件转化需求,简单算了下,差不多刚好要一天时间吧!!
但是,这怎么可以!!!!!!
于是,我主动提出了救援支持,结果就是现在基本上10分钟以内可以搞定以上操作!!
需求梳理
①数据处理:按照一定规则从历史数据日志筛选一批数据
②确定文件及目录:根据一定规则确定文件名及所在FTP子目录(远程FTP按照日期建子目录存储的文件)
③连接FTP:连接远程FTP
④批量下载文件:依据②中文件名及目录循环切换FTP远程目录并下载文件
⑤处理文件:打开文件并删除前5行 在文件第6行行首添加字母,保存时修改文件后缀(格式)
1、数据处理
历史数据日志有多份,存放在同一个文件夹,文件格式是csv
使用pandas进行数据处理操作
处理步骤:
读取数据合并的时候同步按照既定条件进行数据筛选
选择需要用到的字段
原始数据长啥样?
pandas.csv()读取数据后,我们使用info可以发现原始日志包含了71个字段,同时单个文件200MB+38万条数据。。
1>>>df.info()
2<class 'pandas.core.frame.DataFrame'>
3RangeIndex: 386418 entries, 0 to 386417
4Data columns (total 71 columns):
5 # Column Non-Null Count Dtype
6--- ------ -------------- -----
7 0 @timestamp 386418 non-null object
8 1 appid 386418 non-null int64
9 2 assist 386418 non-null int64
10 3 battle_sum 386418 non-null int64
11 4 battleid 386418 non-null int64
12...
13 69 usernum 386418 non-null int64
14 70 victory 386418 non-null int64
15dtypes: float64(7), int64(54), object(10)
16memory usage: 209.3+ MB
筛选既定条件数据
考虑到我们一次性处理的文件数不止一个,所以在读取原始日志后可以先把条件筛选工作做了再合并。
处理完之后,我们发现文件大小降低为7.9KB,很轻松的感觉有木有~
1>>>df = df[df['modetid']>=117 ]
2>>>df = df[df['usernum']>=10 ]
3>>>df = df[df['pentakill']>=1 ]
4>>>df.info()
5<class 'pandas.core.frame.DataFrame'>
6Int64Index: 14 entries, 117184 to 384421
7Data columns (total 71 columns):
8 # Column Non-Null Count Dtype
9--- ------ -------------- -----
10 0 @timestamp 14 non-null object
11 1 appid 14 non-null int64
12 2 assist 14 non-null int64
13 3 battle_sum 14 non-null int64
14 4 battleid 14 non-null int64
15 ...
16 69 usernum 14 non-null int64
17 70 victory 14 non-null int64
18dtypes: float64(7), int64(54), object(10)
19memory usage: 7.9+ KB
选择需要用到的列
实际上我们在后续处理中需要用到的列比较少,咱们一并处理了吧
1>>>df = df[['@timestamp','battleid','herotid','quadrakill','pentakill']]
2>>>df.info()
3<class 'pandas.core.frame.DataFrame'>
4Int64Index: 14 entries, 117184 to 384421
5Data columns (total 5 columns):
6 # Column Non-Null Count Dtype
7--- ------ -------------- -----
8 0 @timestamp 14 non-null object
9 1 battleid 14 non-null int64
10 2 herotid 14 non-null int64
11 3 quadrakill 14 non-null int64
12 4 pentakill 14 non-null int64
13dtypes: int64(4), object(1)
14memory usage: 672.0+ bytes
处理过程封装待用
文件合并操作详见下面代码,使用的是pd.concat()
以下是全部代码:
1import os
2import pandas as pd
3import time
4
5def concatData():
6 start_time = time.perf_counter()
7 print('正在读取原始对局日志......')
8 location = './对局日志'
9 fileList = []
10 n = 0
11 #合并数据
12 for fileName in os.walk(location):
13 for table in fileName[2]:
14 path = fileName[0] + '/' +table
15 Li = pd.read_csv(path,header=0)
16 #通过指定规则筛选数据
17 Li = Li[Li['modetid']>=117 ]
18 Li = Li[Li['usernum']>=10 ]
19 Li = Li[Li['pentakill']>=1 ]
20 n = n+1
21 fileList.append(Li)
22 print('第'+str(n)+'个表格已经合并')
23 print('在该目录下有%d个文件'%len(fileList))
24 print('正在合并,请稍等......')
25 res = pd.concat(fileList,ignore_index = True)
26 print('合并完成......')
27 #选择需要用到的字段
28 res = res[['@timestamp','battleid','herotid','quadrakill','pentakill']]
29 use_time = time.perf_counter() - start_time
30 print('合并数据消耗时长:{0:.2f} 秒\n'.format(use_time))
31 return res
2、确定文件及目录
在上一步数据处理后,我们得到的数据长下面这样:
1>>>df.head()
2 @timestamp battleid herotid quadrakill pentakill
3117184 2020-05-27 13:05:11 110853427027 14 0 1
4130197 2020-05-27 13:49:10 110853428327 27 0 1
5151473 2020-05-27 15:18:37 110853430538 17 0 1
6185862 2020-05-27 17:39:53 110853434015 14 0 1
7194350 2020-05-27 18:01:38 110853434646 22 0 1
在远程FTP里文件存储在二级目录里,二级目录是以日期命令,在历史数据日志里有每个对局发生的时间,因此可以通过这些字段行程 改文件及所在目录关系。
由于时间字段@timestamp是object格式,且形如“2020-05-27 13:05:11”,我们直接采用字符串的split()方法即可获得日期目录。
1df['@timestamp'] = df['@timestamp'].str.split(' ').str[0]
文件格式为str(df.iloc[i][1])+'.bd'
3、连接FTP
Python中默认安装的ftplib模块,常见的函数列举如下:
参考文档:https://docs.python.org/3/library/ftplib.html
1 **ftp登录连接**
2from ftplib import FTP #加载ftp模块
3ftp=FTP() #设置变量
4ftp.set_debuglevel(2) #打开调试级别2,显示详细信息
5ftp.connect("IP","port") #连接的ftp sever和端口
6ftp.login("user","password") #连接的用户名,密码
7print ftp.getwelcome() #打印出欢迎信息
8ftp.cmd("xxx/xxx") #进入远程目录
9bufsize=1024 #设置的缓冲区大小
10filename="filename.txt" #需要下载的文件
11file_handle=open(filename,"wb").write #以写模式在本地打开文件
12ftp.retrbinaly("RETR filename.txt",file_handle,bufsize) #接收服务器上文件并写入本地文件
13ftp.set_debuglevel(0) #关闭调试模式
14ftp.quit() #退出ftp
15**ftp相关命令操作**
16ftp.cwd(pathname) #设置FTP当前操作的路径
17ftp.dir() #显示目录下所有目录信息
18ftp.nlst() #获取目录下的文件
19ftp.mkd(pathname) #新建远程目录
20ftp.pwd() #返回当前所在位置
21ftp.rmd(dirname) #删除远程目录
22ftp.delete(filename) #删除远程文件
23ftp.rename(fromname, toname)#将fromname修改名称为toname。
24ftp.storbinaly("STOR filename.txt",file_handel,bufsize) #上传目标文件
25ftp.retrbinary("RETR filename.txt",file_handel,bufsize) #下载FTP文件
获取远程FTP地址端口及账号密码后,即可进行连接
1from ftplib import FTP
2
3def ftpConnect():
4 #实例化一个fto对象
5 ftp =FTP()
6 #ftp地址及账号密码
7 host = 'xxx'
8 port = xxx
9 user_name = 'xxx'
10 password = 'xxx'
11 #连接ftp
12 ftp.connect(host ,port)
13 ftp.login(user_name,password)
14 #打印欢迎消息
15 print (ftp.getwelcome())
16 #设置被动模式(0是主动,1是被动)
17 ftp.set_pasv(1)
18 print('ftp连接成功\n')
19 return ftp
4、批量下载文件
下载文件前需要先切换到该文件所在的文件目录,然后再进行文件下载
切换文件目录:ftp.cwd(pathname)
下载文件:ftp.retrbinary("RETR filename.txt",file_handel,bufsize) ,filename.txt是我们需要下载的文件
下载文件前先以写模式在本地打开文件file_handle=open(filename,"wb").write
1def ftpDownload(ftp,df):#df存放需要下载的文件及其所在目录
2 start_time = time.perf_counter()
3 n = 0
4 m = 0
5 print('正在下载文件')
6 for i in range(len(df.index)):
7
8 #获取文件所在目录
9 pathname = df.iloc[i][0]
10 #切换到文件所在目录
11 ftp.cwd(pathname)
12 #打印一级文件目录
13 #files = ftp.dir()
14 #获取目录下的所有文件
15 #file_list = ftp.nlst()
16 #设置本文件下载存储所在路径(./是当前文件所在路径)
17 local_path="./录像源文件/"
18 #为准备下载到本地的文件,创建文件对象
19 remote_file_name = str(df.iloc[i][1]) +'.bd'
20 try:
21 local_file_name=local_path + os.path.basename(remote_file_name)
22 file = open(local_file_name, 'wb')
23 #从FTP服务器下载文件到前一步创建的文件对象,其中写对象为file.write,1024是缓冲区大小
24 ftp.retrbinary('RETR '+remote_file_name,file.write,1024)
25 #关闭下载到本地的文件
26 file.close()
27 except :
28 m = m+1
29 print(f'\r共{m}个文件下载失败,共{n}个文件下载完成',end = ' ')
30 else:
31 n = n+1
32 print(f"\r共{m}个文件下载失败,共{n}个文件下载完成",end = ' ')
33 ftp.cwd('/')
34 #关闭FTP客户端连接
35 ftp.close()
36 print(f'\n共{n}个有效对局文件~')
37 print('\nftp连接已关闭')
38 use_time = time.perf_counter() - start_time
39 print('FTP数据下载消耗时长:{0:.2f} 秒\n'.format(use_time))
5、处理文件
由于需要处理的文件是字符串类型是bytes,在打开的时候需要用“rb”,删除前5行简单用del即可。
在第六行行首写入字符时,需要注意以b作为前缀。
1def fileHandle(df):
2 print('正在进行数据转化')
3 m= 0
4 n= 0
5 for i in range(len(df.index)):
6 try:
7 refile = './录像源文件/' + str(df.iloc[i][1]) + '.bd'
8 fre = open(refile,'rb')
9 a = fre.readlines()
10 fre.close()
11 del a[0:5]
12
13 wrfile = './录像可执行文件/' + f'{str(i)}-' + df.iloc[i][5]+'五杀'+'.rep'
14 fwr = open(wrfile, 'wb')
15 a[0]= b'$'+a[0]
16 fwr.writelines(a)
17 fwr.close()
18 except :
19 m = m+1
20 print(f'\r共{m}个文件转化失败,共{n}个文件转化完成',end = ' ')
21 else:
22 n = n+1
23 print(f"\r共{m}个文件转化失败,共{n}个文件转化完成",end = ' ')
6、最后,让脚本运行起来
没啥别的,一步一步走,我们发现执行效率还蛮高的。
中间遇到过 "error_perm:550 Failed to open file. "的问题,后来发现是远程文件目录不对或者本地文件没有读写权限导致的。
1if __name__ == '__main__':
2 starttime = time.perf_counter()
3 #合并数据并过滤
4 res = concatData()
5 #关联英雄名称并处理日期与ftp目录一致
6 #df = mergeData(res)
7 #登录ftp
8 ftp = ftpConnect()
9 #下载指定文件
10 ftpDownload(ftp,df)
11 #转化数据为可播放文件
12 fileHandle(df)
13 usetime = time.perf_counter() - starttime
14 print('\n本次累积消耗时长:{0:.2f} 秒\n'.format(usetime))
人工处理可能需要一天时间,脚本执行只用了不到7分钟!!
1>>>runfile('D:/ftp资源下载/ftp批量下载文件.py', wdir='D:/ftp资源下载')
2正在读取原始对局日志......
3第1个表格已经合并
4第2个表格已经合并
5第3个表格已经合并
6在该目录下有3个文件
7正在合并,请稍等......
8合并完成......
9合并数据消耗时长:10.29 秒
10
11一共81个可用对局文件
12220 (vsFTPd 3.0.2)
13ftp连接成功
14
15正在下载文件
16共18个文件下载失败,共63个文件下载完成
17共63个有效对局文件~
18
19ftp连接已关闭
20FTP数据下载消耗时长:395.89 秒
21
22正在进行数据转化
23共18个文件转化失败,共63个文件转化完成
24本次累积消耗时长:407.21 秒